home *** CD-ROM | disk | FTP | other *** search
/ Cream of the Crop 26 / Cream of the Crop 26.iso / os2 / pvm34b3.zip / pvm34b3 / pvm3 / src / pvmdshmem.c < prev    next >
C/C++ Source or Header  |  1997-07-22  |  26KB  |  1,030 lines

  1.  
  2. static char rcsid[] =
  3.     "$Id: pvmdshmem.c,v 1.14 1997/07/02 20:27:32 pvmsrc Exp $";
  4.  
  5. /*
  6.  *         PVM version 3.4:  Parallel Virtual Machine System
  7.  *               University of Tennessee, Knoxville TN.
  8.  *           Oak Ridge National Laboratory, Oak Ridge TN.
  9.  *                   Emory University, Atlanta GA.
  10.  *      Authors:  J. J. Dongarra, G. E. Fagg, M. Fischer
  11.  *          G. A. Geist, J. A. Kohl, R. J. Manchek, P. Mucci,
  12.  *         P. M. Papadopoulos, S. L. Scott, and V. S. Sunderam
  13.  *                   (C) 1997 All Rights Reserved
  14.  *
  15.  *                              NOTICE
  16.  *
  17.  * Permission to use, copy, modify, and distribute this software and
  18.  * its documentation for any purpose and without fee is hereby granted
  19.  * provided that the above copyright notice appear in all copies and
  20.  * that both the copyright notice and this permission notice appear in
  21.  * supporting documentation.
  22.  *
  23.  * Neither the Institutions (Emory University, Oak Ridge National
  24.  * Laboratory, and University of Tennessee) nor the Authors make any
  25.  * representations about the suitability of this software for any
  26.  * purpose.  This software is provided ``as is'' without express or
  27.  * implied warranty.
  28.  *
  29.  * PVM version 3 was funded in part by the U.S. Department of Energy,
  30.  * the National Science Foundation and the State of Tennessee.
  31.  */
  32.  
  33. /*
  34.  * pvmdshmem.c
  35.  *
  36.  * Shared-memory MPP interface.
  37.  *
  38. $Log: pvmdshmem.c,v $
  39.  * Revision 1.14  1997/07/02  20:27:32  pvmsrc
  40.  *     Fixed startup race on shmem to that a shmem task can get fully
  41.  *     configured before getting any messages.
  42.  *     This involved adding two states
  43.  *     TF_PRESHMCONN and TF_SHM.  TF_PRESHMCONN indicates that messages
  44.  *     with MM_PRIO set can be sent to a task, but regular messages are
  45.  *     queued. This allows shmem tasks to be completely configured
  46.  *     before any messages flow.  When the daemon changes the state from
  47.  *     TF_PRESHMCONN to TF_SHMCONN it calls shm_wrt_pkts to write any
  48.  *     packets that were queued before task state changed to TF_SHMCONN.
  49.  *
  50.  * Revision 1.13  1997/06/27  20:54:41  pvmsrc
  51.  * Allow forked process to be caught by daemon
  52.  *     when its not ignoring SIGCLD.
  53.  *
  54.  * Revision 1.12  1997/06/25  22:09:34  pvmsrc
  55.  * Markus adds his frigging name to the author list of
  56.  *     every file he ever looked at...
  57.  *
  58.  * Revision 1.11  1997/06/24  20:39:19  pvmsrc
  59.  * Eliminated unnecessary global externs.
  60.  *
  61.  * Revision 1.10  1997/06/16  13:42:10  pvmsrc
  62.  * Upated forkexec arg list.
  63.  *
  64.  * Revision 1.9  1997/06/02  13:50:03  pvmsrc
  65.  * Added missing #include host.h for waitc.h.
  66.  *
  67.  * Revision 1.8  1997/06/02  13:26:43  pvmsrc
  68.  * Changed mpp_input so that spawned processes can be claimed after
  69.  *     exiting.  Previously they were claimed as their shadow t0 died
  70.  *     and set a flag in work() that would get caught leading to
  71.  *     mpp_dredge() finding them sometime later...
  72.  *
  73.  * Revision 1.7  1997/05/21  16:01:53  pvmsrc
  74.  * Updated ifdefs to include AIX4MP arch type.
  75.  *
  76.  * Revision 1.6  1997/05/19  15:03:01  pvmsrc
  77.  * pvmfrgsiz is now set according to memory page size and msg header
  78.  *     lengths instead of just the UDP host-host packet size!
  79.  * Mesg headers are set correctly in mpp_input / mpp_output to include
  80.  *     contexts etc
  81.  *
  82.  * Revision 1.5  1997/04/25  19:17:29  pvmsrc
  83.  * Mashed to make it work within 3.4 new messaging system. Added:
  84.  *     -    correct includes
  85.  *     -    changed ph_cod's to ph_tag's and pk_cod -> pk_tag
  86.  *     -    add duplicate of pvmdunix ppi_kill()
  87.  *
  88.  * Revision 1.4  1997/01/28  19:27:23  pvmsrc
  89.  * New Copyright Notice & Authors.
  90.  *
  91.  * Revision 1.3  1996/10/25  13:58:01  pvmsrc
  92.  * Replaced old #includes for protocol headers:
  93.  *     - <pvmsdpro.h>, "ddpro.h", "tdpro.h"
  94.  * With #include of new combined header:
  95.  *     - <pvmproto.h>
  96.  *
  97.  * Revision 1.2  1996/10/24  21:04:50  pvmsrc
  98.  * Moved #include of "global.h" down below other headers:
  99.  *     - need to have all of the structures / types declared before
  100.  *         the globals can be declared...
  101.  *
  102.  * Revision 1.1  1996/09/23  23:44:33  pvmsrc
  103.  * Initial revision
  104.  *
  105.  * Revision 1.20  1995/11/02  16:31:23  manchek
  106.  * skip over stale packets from dead tasks in mpp_input
  107.  *
  108.  * Revision 1.19  1995/09/05  19:24:05  manchek
  109.  * mpp_input copies all pages for now (in case sender task exits)
  110.  *
  111.  * Revision 1.18  1995/07/25  17:35:32  manchek
  112.  * mpp_probe cancels retry to a task if not ready.
  113.  * mpp_output returns success code
  114.  *
  115.  * Revision 1.17  1995/07/24  21:48:25  manchek
  116.  * mpp_output puts pkts in order (at the end) of ovfpkts.
  117.  * mpp_probe consumes ovfpkts in order to retry
  118.  *
  119.  * Revision 1.16  1995/07/24  19:22:33  manchek
  120.  * message, frag headers passed in inbox shmpkhdr instead of databuf
  121.  * changes in mpp_input, mpp_output.
  122.  * removed mpp_mcast - wasn't called
  123.  *
  124.  * Revision 1.15  1995/07/12  01:12:14  manchek
  125.  * do nothing in mpp_free if tid is zero.
  126.  * peer_detach now frees peer struct.
  127.  * mpp_dredge can reclaim pidtid slot immediately if task not registered
  128.  *
  129.  * Revision 1.14  1995/07/05  16:21:40  manchek
  130.  * added ST_FINISH to pidtid_dump
  131.  *
  132.  * Revision 1.13  1995/07/05  16:16:29  manchek
  133.  * added mpp_dredge function to skim pidtid table for zombies and call
  134.  * task_cleanup and task_free.
  135.  * mpp_free (called from task_free) now reclaims pidtid table entry
  136.  *
  137.  * Revision 1.12  1995/07/03  19:55:47  manchek
  138.  * hellish cleanup of comments and formatting.
  139.  * removed POWER4 ifdefs.
  140.  * removed removeshm().
  141.  * added mpp_setstatus() and pidtid_dump()
  142.  *
  143.  * Revision 1.11  1995/06/28  15:50:57  manchek
  144.  * added arg to peer_conn calls
  145.  *
  146.  * Revision 1.10  1995/06/19  17:45:01  manchek
  147.  * inc refcount before signalling in mpp_output
  148.  *
  149.  * Revision 1.9  1995/06/02  16:21:10  manchek
  150.  * fixed references to detached memory segments
  151.  *
  152.  * Revision 1.8  1995/05/22  19:51:37  manchek
  153.  * added ifdefs for RS6KMP
  154.  *
  155.  * Revision 1.7  1995/05/18  17:22:25  manchek
  156.  * need to export pvminbox and myshmbufid
  157.  *
  158.  * Revision 1.6  1995/05/17  16:41:29  manchek
  159.  * changed global mytid to pvmmytid.
  160.  * changed inbox to pvminbox and mybufid to myshmbufid.
  161.  * added support for CSPP shared memory.
  162.  * unset TF_FORKD flag if task doesn't reconnect with expected pid
  163.  *
  164.  * Revision 1.5  1995/02/06  22:42:01  manchek
  165.  * new function mpp_setmtu, called before slave_config
  166.  *
  167.  * Revision 1.4  1995/02/01  21:35:06  manchek
  168.  * added nenv and envp args to mpp_load, which passes them to forkexec
  169.  *
  170.  * Revision 1.3  1994/11/08  15:35:07  manchek
  171.  * shared memory damage control
  172.  *
  173.  */
  174.  
  175.  
  176. #include <sys/param.h>
  177. #include <sys/types.h>
  178. #include <sys/time.h>
  179. #include <netinet/in.h>
  180. #include <fcntl.h>
  181. #ifdef IMA_SYMM
  182. #include <sys/file.h>        /* XXX for open(); change to fcntl.h in ptx? */
  183. #include <parallel/parallel.h>
  184. #endif
  185. #include <stdio.h>
  186. #include <string.h>
  187. #include <errno.h>
  188. #include <sys/ipc.h>
  189. #include <sys/shm.h>
  190. #include <sys/sem.h>
  191. #include <unistd.h>
  192. #include <stdlib.h>
  193.  
  194. #include <pvm3.h>
  195. #include <pvmproto.h>
  196. #include "pvmalloc.h"
  197. #include "pvmdabuf.h"
  198. #include "pkt.h"
  199. #include "task.h"
  200. #include "listmac.h"
  201. #include "pvmshmem.h"
  202. #include "bfunc.h"
  203. #include "host.h"
  204. #include "waitc.h"
  205. #include "global.h"
  206.  
  207. #ifndef max
  208. #define max(a,b)    ((a)>(b)?(a):(b))
  209. #endif
  210.  
  211. #ifndef min
  212. #define min(a,b)    ((a)<(b)?(a):(b))
  213. #endif
  214.  
  215. char *getenv();
  216.  
  217.  
  218. extern int pvmdebmask;            /* from pvmd.c */
  219. extern int pvm_useruid;            /* from pvmd.c */
  220. extern int pvmmytid;            /* from pvmd.c */
  221. extern int pvmmydsig;            /* from pvmd.c */
  222. extern int pvmudpmtu;            /* from pvmd.c */
  223. extern int pvmmyupid;            /* from pvmd.c */
  224. extern struct peer *peers;        /* from pvmshmem.c */
  225.  
  226.  
  227. /***************
  228.  **  Globals  **
  229.  **           **
  230.  ***************/
  231.  
  232. char *outmsgbuf = 0;            /* outgoing message buffer */
  233. int outbufsz = 0;                /* size of outgoing msg buffer */
  234. int nbufsowned = 0;                /* num shared frags held by us */
  235. int pgsz = 0;                    /* page size */
  236. int pvmpgsz = 0;                /* PVM virtual page size */
  237. char *infopage = 0;                /* proto, NDF, pid-tid table */
  238. struct pidtid *pidtids = 0;        /* pid -> tid table */
  239. int maxpidtid = 0;                /* size of pid-tid table */
  240. int shmbufsiz = 0;                /* shared-memory buffer size */
  241. int myshmbufid = -1;            /* shared-memory msg buffer ID */
  242. char *pvminbox = 0;                /* incoming message header buffer */
  243. extern int pvmfrgsiz;            /* From pvmd.c frag size */
  244.                                 /* included here as we change it in ppi_config*/
  245.  
  246. /***************
  247.  **  Private  **
  248.  **           **
  249.  ***************/
  250.  
  251. static char pvmtxt[512];        /* scratch for error log */
  252. static int inboxsz = 0;            /* size of incoming message buffer */
  253. static struct pkt *ovfpkts = 0;    /* packets waiting to be delivered */
  254. static int globid = -1;            /* ID of global shared segment */
  255.  
  256.  
  257. /*
  258.  *            Buffer layout
  259.  *         ____________________
  260.  *        |       inbox        | 1
  261.  *        |____________________|
  262.  *        |   pid->tid table   | 2
  263.  *        |____________________|
  264.  *        |                    | 3
  265.  *        |      outmsgbuf     |
  266.  *      |          .         | .
  267.  *        |          .         | .
  268.  *        |          .         | .
  269.  *
  270.  *
  271.  * Note: tasks don't keep a pid-tid table, so their outgoing message
  272.  * buffer starts at the second page.
  273.  *
  274.  * To send messages to one another, the sender puts the message header,
  275.  * which contains its own task ID and the location of the message body
  276.  * (expressed as an offset from the head of the segment), into the inbox
  277.  * of the intended receiver. When the addressee is ready to accept the
  278.  * message, it reads the message header and locates the message fragments
  279.  * in the sender's outgoing message buffer. Access to buffers are guarded
  280.  * locks. Tasks can read the same data simultaneously, but one must obtain
  281.  * exclusive access to the page before it can modify the data.
  282.  *
  283.  * Messages to tasks on other hosts are routed by pvmd.
  284.  */
  285.  
  286.  
  287. int
  288. ppi_config(argc, argv)
  289.     int argc;
  290.     char **argv;
  291. {
  292.     pgsz = sysconf(_SC_PAGESIZE);
  293.     pvmudpmtu = pgsz - PVMPAGEHDR; /* XXX yuck host to host limited */
  294.     pvmfrgsiz = pvmudpmtu;    /* host-task limit.. not same as UDP host to host */
  295.                     /* except h-h is not same as it should be, see above */
  296.     return 0;
  297. }
  298.  
  299.  
  300. /*    ppi_init()
  301. *
  302. *    Create our shared memory segment, initialize stuff, etc.
  303. *
  304. *    XXX shouldn't just return if something goes wrong
  305. */
  306.  
  307. void
  308. ppi_init()
  309. {
  310.     struct pidtidhdr *pvminfo;
  311.     char *p;
  312.     int key;
  313.  
  314. #ifdef LOG
  315.     char fname[32];
  316.     FILE *logfp;
  317. #ifdef IMA_CSPP
  318.     int scid = get_scid();
  319.     if (scid > 1)
  320.         sprintf(fname, "/tmp/pvmt.%d.%d", pvm_useruid, scid);
  321.     else
  322. #else
  323.     sprintf(fname, "/tmp/pvmt.%d", pvm_useruid);
  324. #endif
  325.     logfp = fopen(fname, "w");
  326.     fclose(logfp);
  327. #endif    /*LOG*/
  328.  
  329.     pvmpgsz = FRAGPAGE*pgsz;
  330.     inboxsz =
  331.         (INBOXPAGE*pgsz - sizeof(struct msgboxhdr))/sizeof(struct shmpkhdr);
  332.     if (!(p = getenv("PVMBUFSIZE")) || !(shmbufsiz = strtol(p, (char**)0, 0)))
  333.         shmbufsiz = SHMBUFSIZE;
  334.  
  335.     key = pvmshmkey(0);
  336.     if ((myshmbufid = shmget((key_t)key, shmbufsiz, IPC_CREAT|PERMS)) == -1) {
  337.         pvmlogperror("ppi_init() can't create msg buffer");
  338.         pvmbailout(0);
  339.         return;
  340.     }
  341. #ifdef IMA_CSPP
  342.     if ((pvminbox = (char *)shm_search(myshmbufid)) == (char *)-1L)
  343. #else
  344.     if ((pvminbox = (char *)shmat(myshmbufid, 0, 0)) == (char *)-1L)
  345. #endif
  346.     {
  347.         pvmlogperror("ppi_init() can't attach msg buffer");
  348.         shmctl(myshmbufid, IPC_RMID, (struct shmid_ds *)0);
  349.         myshmbufid = -1;
  350.         pvmbailout(0);
  351.         return;
  352.     }
  353.  
  354.     infopage = pvminbox + INBOXPAGE*pgsz;
  355.     outmsgbuf = infopage + pgsz;
  356.     if (!(outbufsz = (shmbufsiz - INBOXPAGE*pgsz - pgsz)/pvmpgsz)) {
  357.         pvmlogerror("ppi_init() SHMBUFSIZE too small!");
  358.         shmctl(myshmbufid, IPC_RMID, (struct shmid_ds *)0);
  359.         myshmbufid = -1;
  360.         pvmbailout(0);
  361.         return;
  362.     }
  363.     nbufsowned = 0;
  364.  
  365.     msgbufinit(pvminbox);
  366. #ifndef IMA_KSR1
  367.     PAGEINITLOCK(&((struct shmpghdr *)infopage)->pg_lock);
  368. #endif
  369.     ((struct shmpghdr *)infopage)->pg_ref = 0;
  370.     pvminfo = (struct pidtidhdr *)(infopage + PVMPAGEHDR);
  371.     pvminfo->i_proto = TDPROTOCOL;
  372.     pvminfo->i_dsig = pvmmydsig;
  373.     pvminfo->i_next = 0;
  374.     pvminfo->i_bufsiz = shmbufsiz;
  375.     pvminfo->i_dpid = pvmmyupid;
  376.     pidtids = (struct pidtid *)(pvminfo + 1);
  377.     maxpidtid = (pgsz - sizeof(struct pidtidhdr) - PVMPAGEHDR)/sizeof(struct pidtid);
  378.     BZERO((char *)pidtids, sizeof(struct pidtid)*maxpidtid);
  379.     {
  380.         int i;
  381.  
  382.         for (i = 0; i < maxpidtid; i++)
  383.             pidtids[i].pt_stat = ST_EXIT;
  384.     }
  385.  
  386.     peer_init();
  387.  
  388.     ovfpkts = pk_new(0);
  389.     ovfpkts -> pk_link = ovfpkts->pk_rlink = ovfpkts;
  390.  
  391. /*
  392. #if defined(SUN4SOL2)
  393.     sprintf(pvmtxt, "%ld CPUs online\n", sysconf(_SC_NPROCESSORS_ONLN));
  394.     sprintf(pvmtxt, "%ld CPUs online\n", sysconf(15));
  395.     pvmlogerror(pvmtxt);
  396. #endif
  397. */
  398. }
  399.  
  400.  
  401. /*    mpp_free()
  402. *
  403. *    Remove shared resources for tid.
  404. *    Delete it from peers list (possibly removing segment and semaphore).
  405. *    Delete its pidtid table entry.
  406. */
  407.  
  408. void
  409. mpp_free(tid)
  410.     int tid;
  411. {
  412.     struct peer *pp;
  413.     struct task *tp;
  414.     int i;
  415.  
  416.     if (pvmdebmask & (PDMTASK|PDMNODE)) {
  417.         sprintf(pvmtxt, "mpp_free() t%x\n", tid);
  418.         pvmlogerror(pvmtxt);
  419.     }
  420.  
  421.     if (!tid)
  422.         return;
  423.  
  424.     for (pp = peers->p_link; pp != peers; pp = pp->p_link) {
  425.         if (pp->p_tid == tid) {
  426.             peer_detach(pp);
  427.             break;
  428.         }
  429.     }
  430.  
  431.     PAGELOCK(&((struct shmpghdr *)infopage)->pg_lock);
  432.     for (i = 0; i < maxpidtid; i++)
  433.         if (pidtids[i].pt_tid == tid) {
  434.             pidtids[i].pt_stat = ST_EXIT;
  435.             break;
  436.         }
  437.     PAGEUNLOCK(&((struct shmpghdr *)infopage)->pg_lock);
  438. }
  439.  
  440.  
  441. /*    mpp_conn()
  442. *
  443. *    Fill in pid->tid->key table for a new task.
  444. *    This isn't really "connection" (done by peer_conn).
  445. */
  446.  
  447. int
  448. mpp_conn(tp)
  449.     struct task *tp;
  450. {
  451.     int firstidx, idx;
  452.     struct pidtidhdr *pvminfo = (struct pidtidhdr *)(infopage + PVMPAGEHDR);
  453.  
  454.     PAGELOCK(&((struct shmpghdr *)infopage)->pg_lock);
  455.     firstidx = idx = pvminfo->i_next;
  456.     while (pidtids[idx].pt_stat != ST_EXIT) {
  457.         if (++idx == maxpidtid)
  458.             idx = 0;
  459.         if (idx == firstidx) {
  460.             PAGEUNLOCK(&((struct shmpghdr *)infopage)->pg_lock);
  461.             pvmlogerror("mpp_conn() pidtid table full\n");
  462.             return PvmOutOfRes;
  463.         }
  464.     }
  465.     if ((firstidx = idx + 1) == maxpidtid)
  466.         firstidx = 0;
  467.     pvminfo->i_next = firstidx;
  468.  
  469.     /* XXX so wtf good is an almost-refcount? */
  470.     if (((struct shmpghdr *)infopage)->pg_ref < maxpidtid)
  471.         ((struct shmpghdr *)infopage)->pg_ref++;
  472.     pidtids[idx].pt_tid = tp->t_tid;
  473.     pidtids[idx].pt_ptid = tp->t_ptid;
  474.     pidtids[idx].pt_stat = ST_NOTREADY;
  475.     pidtids[idx].pt_pid = tp->t_pid;
  476.     pidtids[idx].pt_key = 0;
  477.     pidtids[idx].pt_cond = 0;
  478.     PAGEUNLOCK(&((struct shmpghdr *)infopage)->pg_lock);
  479.     if (pvmdebmask & (PDMTASK|PDMMEM)) {
  480.         sprintf(pvmtxt, "mpp_conn() assigning pidtid[%d] to t%x\n",
  481.                 idx, tp->t_tid);
  482.         pvmlogerror(pvmtxt);
  483.     }
  484.     if (pvmdebmask & PDMTASK) {
  485.         sprintf(pvmtxt, "mpp_conn() new task t%x\n", tp->t_tid);
  486.         pvmlogerror(pvmtxt);
  487.     }
  488.     return 0;
  489. }
  490.  
  491.  
  492. void
  493. mpp_input()
  494. {
  495.     int next;
  496.     struct pkt *pp;
  497.     struct peer *pe;
  498.     struct task *tp;
  499.     int tid;
  500.     int sdr;
  501.     int src;
  502.     int dst;
  503.     int len;
  504.     char *cp, *buf;
  505.     struct msgboxhdr *inbp;
  506.     struct shmpkhdr *inmsgs;
  507.  
  508.     inbp = (struct msgboxhdr *)pvminbox;
  509.     inmsgs = (struct shmpkhdr *)(inbp + 1);
  510.     do {
  511.         next = (inbp->mb_read + 1) % inboxsz;
  512.         sdr = inmsgs[next].ph_sdr;
  513.         src = inmsgs[next].ph_src;
  514.  
  515.         if (inmsgs[next].ph_dat < 0) {        /* new task */
  516.             int ipid;
  517.  
  518.             if (!(ipid = inmsgs[next].ph_dst))
  519.                 ipid = src;
  520.             if (!(tp = task_findpid(ipid))) {
  521.                 /* not spawned by us */
  522.                 if ((tid = tid_new()) < 0) {
  523.                     pvmlogerror("mpp_input() out of tids?\n");
  524.                     continue;
  525.                 }
  526.                 tp = task_new(tid);
  527.                 task_setpid(tp, src);
  528.                 mpp_conn(tp);
  529.  
  530.             } else if (tp->t_pid != src) {
  531.                 task_setpid(tp, src);
  532.                 tp->t_flag &= ~TF_FORKD;
  533.             }
  534.             tp->t_flag |= TF_CONN | TF_SHM;  
  535.             continue;
  536.         }
  537.  
  538.         if (!(pe = peer_conn(sdr, (int *)0)) || pe == (struct peer *)-1L) {
  539.             sprintf(pvmtxt, "mpp_input() can't connect to sender t%x\n", sdr);
  540.             pvmlogerror(pvmtxt);
  541.             continue;
  542.         }
  543.         cp = pe->p_buf + INBOXPAGE*pgsz + inmsgs[next].ph_dat;
  544.         buf = cp - (inmsgs[next].ph_dat & (pgsz-1)) + PVMPAGEHDR;
  545.         dst = inmsgs[next].ph_dst;
  546.         len = inmsgs[next].ph_len;
  547.  
  548.         /*
  549.         * must copy all packets (for now).
  550.         * sender may exit, and we don't check if we have page references
  551.         * into a segment before we detach.
  552.         */
  553. /*
  554.         if (TIDISHERE(dst, pvmmytid) && TIDISTASK(dst)) {
  555. */
  556.             pp = pk_new(len + MAXHDR);
  557.             pp->pk_dat += MAXHDR;
  558.             BCOPY(cp, pp->pk_dat, len);
  559.             da_unref(buf);
  560.             cp = pp->pk_dat;
  561.  
  562. /*
  563.         } else {
  564.             pp = pk_new(0);
  565.             pp->pk_dat = cp;
  566.             pp->pk_buf = buf;
  567.             pp->pk_max = pvmudpmtu;
  568.         }
  569. */
  570.         pp->pk_len = len;
  571.         pp->pk_src = src;
  572.         pp->pk_dst = dst;
  573.         pp->pk_flag = inmsgs[next].ph_flag;
  574.         pp->pk_tag = inmsgs[next].ph_tag;
  575.         pp->pk_ctx = inmsgs[next].ph_ctx;
  576.         pp->pk_enc = inmsgs[next].ph_enc;
  577.         pp->pk_wid = inmsgs[next].ph_wid;
  578.         pp->pk_crc = inmsgs[next].ph_crc;
  579.  
  580.         if (tp = task_find(src)) {
  581.             loclinpkt(tp, pp);
  582.  
  583.             /* CL tasks caught here. forked ones caught by SIGCHLD/SIGCLD */
  584.             if ((tp->t_flag & TF_CLOSE) && !(tp->t_flag & TF_FORKD)) { 
  585.                 task_cleanup(tp);
  586.                 task_free(tp);
  587.             }
  588.  
  589.         } else {
  590.             sprintf(pvmtxt, "mpp_input() from unknown task t%x\n", src);
  591.             pvmlogerror(pvmtxt);
  592.             pk_free(pp);
  593.         }
  594.  
  595.     } while ((inbp->mb_read = next) != inbp->mb_last);
  596. }
  597.  
  598.  
  599. /*    mpp_output()
  600. *
  601. *    Send packet to a task if it's connected, otherwise queue it in
  602. *    a list to be retried later.
  603. *
  604. *    Returns 0 if packet sent, else 1.
  605. */
  606.  
  607. int
  608. mpp_output(tp, pp)
  609.     struct task *tp;
  610.     struct pkt *pp;
  611. {
  612.     struct peer *pe;
  613.     int dst;
  614.     struct shmpkhdr *dmsgs = 0;
  615.     struct pkt *pp1, *pp2;
  616.     struct msgboxhdr *dboxp;
  617.     char *cp;
  618.     int loc;
  619.     int next;
  620.  
  621.     dst = pp->pk_dst;
  622.  
  623.     /*
  624.     * if page is private, copy and replace it with one in shared buf
  625.     */
  626.  
  627.     if ((loc = pp->pk_dat - outmsgbuf) > outbufsz * pvmpgsz || loc < 0) {
  628.         if (nbufsowned == outbufsz) {
  629.             static int once = 1;
  630.  
  631.             if (once) {
  632.                 pvmlogerror("mpp_output() Message(s) too long for shared buffer, deadlocked.\n");
  633.                 once = 0;
  634.             }
  635.         }
  636.  
  637.         cp = 0;
  638.         do {
  639.             if (cp)
  640.                 da_unref(cp);
  641.             cp = da_new(MAXHDR + pp->pk_len);
  642.         } while ((loc = cp - outmsgbuf) > outbufsz*pvmpgsz || loc < 0);
  643.  
  644.         BCOPY(pp->pk_dat, cp + MAXHDR, pp->pk_len);
  645.         pp->pk_dat = cp + MAXHDR;
  646.         da_unref(pp->pk_buf);
  647.         pp->pk_buf = cp;
  648.     }
  649.  
  650.     if ((pe = peer_conn(dst, (int *)0)) && pe != (struct peer *)-1L) {
  651.         dboxp = (struct msgboxhdr *)pe->p_buf;
  652.         dmsgs = (struct shmpkhdr *)(dboxp + 1);
  653.         PAGELOCK(&dboxp->mb_lock);
  654.         if ((next = (dboxp->mb_last + 1) % inboxsz) != dboxp->mb_read) {
  655.             if (pvmdebmask & PDMPACKET) {
  656.                 sprintf(pvmtxt,
  657.                     "mpp_output() src t%x dst t%x ff %x len %d\n",
  658.                     pp->pk_src, pp->pk_dst, pp->pk_flag & (FFSOM|FFEOM),
  659.                     pp->pk_len);
  660.                 pvmlogerror(pvmtxt);
  661.             }
  662.             dmsgs[next].ph_src = pp->pk_src;
  663.             dmsgs[next].ph_dst = dst;
  664.             dmsgs[next].ph_sdr = pvmmytid;
  665.             dmsgs[next].ph_dat = loc;
  666.             dmsgs[next].ph_len = pp->pk_len;
  667.             dmsgs[next].ph_flag = pp->pk_flag & (FFSOM|FFEOM);
  668.             dmsgs[next].ph_ctx = pp->pk_ctx;
  669.             dmsgs[next].ph_tag = pp->pk_tag;
  670.             dmsgs[next].ph_enc = pp->pk_enc;
  671.             dmsgs[next].ph_wid = pp->pk_wid;
  672.             dmsgs[next].ph_crc = pp->pk_crc;
  673.             da_ref(pp->pk_buf);
  674.             dboxp->mb_last = next;
  675.  
  676.             if (dboxp->mb_sleep) {
  677. #if defined(IMA_SUNMP) || defined(IMA_RS6KMP) || defined(IMA_AIX4MP)
  678. #ifdef    IMA_SUNMP
  679.                 cond_signal(&dboxp->mb_cond);
  680. #endif
  681. #if defined(IMA_RS6KMP) || defined(IMA_AIX4MP)
  682.                 pthread_cond_signal(&dboxp->mb_cond);
  683. #endif
  684. #else
  685.                 peer_wake(pe);
  686. #endif
  687.                 dboxp->mb_sleep = 0;
  688.             }
  689.  
  690.             PAGEUNLOCK(&dboxp->mb_lock);
  691.             pk_free(pp);
  692.             return 0;
  693.  
  694.         } else
  695.             LISTPUTBEFORE(ovfpkts, pp, pk_link, pk_rlink);
  696.         PAGEUNLOCK(&dboxp->mb_lock);
  697.  
  698.     } else
  699.         LISTPUTBEFORE(ovfpkts, pp, pk_link, pk_rlink);
  700.     return 1;
  701. }
  702.  
  703.  
  704. /*    mpp_probe()
  705. *
  706. *    Try to send buffered packets that couldn't be delivered before.
  707. *    Update state of task from NOTREADY to SOCKET if it has socket connection.
  708. *    XXX shouldn't be done here, why not in loclconn.
  709. *
  710. *    Returns 1 if packets ready for receipt, else 0.
  711. */
  712.  
  713. int
  714. mpp_probe()
  715. {
  716.     struct pkt *pp, *pp2, *tosend;
  717.     int dst;
  718.     struct task *tp;
  719.     int hasmsg;
  720.     struct msgboxhdr *inbp = (struct msgboxhdr *)pvminbox;
  721.     struct pidtidhdr *pvminfo = (struct pidtidhdr *)(infopage + PVMPAGEHDR);
  722.     int ntids, i;
  723.  
  724.     tosend = ovfpkts;
  725.  
  726.     ovfpkts = pk_new(0);
  727.     ovfpkts -> pk_link = ovfpkts -> pk_rlink = ovfpkts;
  728.  
  729.     while ((pp = tosend->pk_link) != tosend) {
  730.         LISTDELETE(pp, pk_link, pk_rlink);
  731.         dst = pp->pk_dst;
  732.         if (tp = task_find(dst)) {
  733.             if (tp->t_sock < 0) {
  734.                 if (mpp_output(tp, pp)) {
  735.                     for (pp = tosend->pk_link; pp != tosend; pp = pp2) {
  736.                         pp2 = pp->pk_link;
  737.                         if (pp->pk_dst == dst) {
  738.                             LISTDELETE(pp, pk_link, pk_rlink);
  739.                             LISTPUTBEFORE(ovfpkts, pp, pk_link, pk_rlink);
  740.                         }
  741.                     }
  742.                 }
  743.  
  744.             } else
  745.                 pkt_to_task(tp, pp);
  746.  
  747.         } else {
  748.             sprintf(pvmtxt, "mpp_probe() pkt from t%x to t%x scrapped",
  749.                     pp->pk_src, pp->pk_dst);
  750.             pvmlogperror(pvmtxt);
  751.             pk_free(pp);
  752.         }
  753.     }
  754.  
  755.     pk_free(tosend);
  756.  
  757.     ntids = min(maxpidtid, ((struct shmpghdr *)infopage)->pg_ref);
  758.     for (i = 0; i < ntids; i++)
  759.         if (pidtids[i].pt_stat == ST_NOTREADY
  760.         && (tp = task_find(pidtids[i].pt_tid)) && tp->t_sock >= 0)
  761.             pidtids[i].pt_stat = ST_SOCKET;
  762.  
  763.     hasmsg = (inbp->mb_read != inbp->mb_last);
  764.     return hasmsg;
  765. }
  766.  
  767.  
  768. /*    mpp_cleanup()
  769. *
  770. *    We're bailing out.  All hands on deck. Remove our shared segment and 
  771. *   as many segments and semaphores as we can for our tasks.
  772. *
  773. */
  774.  
  775. void
  776. mpp_cleanup()
  777. {
  778.     struct peer *pp;
  779.     struct shmid_ds shmds;
  780.  
  781. /*
  782.     if (pvminbox && shmdt(pvminbox) == -1)
  783.         pvmlogperror("mpp_cleanup() shmdt inbox");
  784. */
  785.  
  786.     if (myshmbufid != -1
  787.     && shmctl(myshmbufid, IPC_RMID, (struct shmid_ds *)0) == -1)
  788.         pvmlogperror("mpp_cleanup() shmctl IPC_RMID mybuf");
  789.  
  790.     if (peers) {
  791.         for (pp = peers->p_link; pp != peers; pp = pp->p_link) {
  792.             if (pp->p_buf) {
  793.                 shmdt(pp->p_buf);
  794.                 pp->p_buf = 0;
  795.             }
  796.  
  797.             if (pp->p_shmid == -1 && pp->p_key)
  798.                 pp->p_shmid = shmget((key_t)pp->p_key, shmbufsiz, 0);
  799.             if (pp->p_shmid != -1 &&
  800.             shmctl(pp->p_shmid, IPC_RMID, (struct shmid_ds *)0) == -1) {
  801.                 sprintf(pvmtxt, "shmctl id=0x%x", pp->p_shmid);
  802.                 pvmlogperror(pvmtxt);
  803.             }
  804.  
  805. #ifdef    USERECVSEMAPHORE
  806.             if (pp->p_semid == -1 && pp->p_key)
  807.                 pp->p_semid = semget((key_t)pp->p_key, 1, PERMS);
  808.             if (pp->p_semid != -1 && semctl(pp->p_semid, 0, IPC_RMID) == -1) {
  809.                 sprintf(pvmtxt, "semctl id=0x%x", pp->p_semid);
  810.                 pvmlogperror(pvmtxt);
  811.             }
  812. #endif
  813.             pp->p_key = 0;
  814.         }
  815.     }
  816. }
  817.  
  818.  
  819. pidtid_dump()
  820. {
  821.     int i;
  822.     char *s;
  823.  
  824.     pvmlogerror("pidtid_dump()\n");
  825.     for (i = 0; i < maxpidtid; i++) {
  826.         switch (pidtids[i].pt_stat) {
  827.         case ST_EXIT:
  828.             s = 0;
  829.             break;
  830.  
  831.         case ST_NOTREADY:
  832.             s = "NOTREADY";
  833.             break;
  834.  
  835.         case ST_SHMEM:
  836.             s = "SHMEM";
  837.             break;
  838.  
  839.         case ST_SOCKET:
  840.             s = "SOCKET";
  841.             break;
  842.  
  843.         case ST_FINISH:
  844.             s = "FINISH";
  845.             break;
  846.  
  847.         default:
  848.             s = "UNKNOWN";
  849.             break;
  850.         }
  851.  
  852.         if (s) {
  853.             sprintf(pvmtxt, "%4d pid %d tid %x ptid %x stat %s key 0x%x",
  854.                     i,
  855.                     pidtids[i].pt_pid,
  856.                     pidtids[i].pt_tid,
  857.                     pidtids[i].pt_ptid,
  858.                     s,
  859.                     pidtids[i].pt_key);
  860. #ifdef IMA_CSPP
  861.             sprintf(pvmtxt + strlen(pvmtxt), " node %d\n", pidtids[i].pt_node);
  862. #else
  863.             strcat(pvmtxt, "\n");
  864. #endif
  865.             pvmlogerror(pvmtxt);
  866.         }
  867.     }
  868.     return 0;
  869. }
  870.  
  871.  
  872. /*    mpp_setstatus()
  873. *
  874. *    Take snapshot of task conditions and set flags in task records.
  875. */
  876.  
  877. int
  878. mpp_setstatus(tid)
  879.     int tid;            /* not used right now */
  880. {
  881.     int i;
  882.     struct task *tp;
  883.  
  884.     PAGELOCK(&((struct shmpghdr *)infopage)->pg_lock);
  885.     for (i = 0; i < maxpidtid; i++)
  886.         if (pidtids[i].pt_stat != ST_EXIT)
  887.             if (tp = task_find(pidtids[i].pt_tid)) {
  888.                 tp->t_flag &= ~TF_DEADSND;
  889.                 if (pidtids[i].pt_cond)
  890.                     tp->t_flag |= TF_DEADSND;
  891.             }
  892.     PAGEUNLOCK(&((struct shmpghdr *)infopage)->pg_lock);
  893.     return 0;
  894. }
  895.  
  896.  
  897. /*    mpp_dredge()
  898. *
  899. *    Dredge pidtid table for zombies, with pt_stat == ST_FINISH.
  900. *    Exit the task cleanly if it still exists and recycle the table entry.
  901. */
  902.  
  903. int
  904. mpp_dredge()
  905. {
  906.     int i;
  907.     struct task *tp;
  908.  
  909.     PAGELOCK(&((struct shmpghdr *)infopage)->pg_lock);
  910.     for (i = 0; i < maxpidtid; i++)
  911.         if (pidtids[i].pt_stat == ST_FINISH) {
  912.             if (tp = task_find(pidtids[i].pt_tid)) {
  913.                 PAGEUNLOCK(&((struct shmpghdr *)infopage)->pg_lock);
  914.                 task_cleanup(tp);
  915.                 task_free(tp);
  916.                 PAGELOCK(&((struct shmpghdr *)infopage)->pg_lock);
  917.             } else
  918.                 pidtids[i].pt_stat = ST_EXIT;
  919.         }
  920.     PAGEUNLOCK(&((struct shmpghdr *)infopage)->pg_lock);
  921.     return 0;
  922. }
  923.  
  924.  
  925. /*    ppi_load()
  926. *
  927. *    Fork and exec new tasks.  Give them pidtid table entries.
  928. */
  929.  
  930. int
  931. ppi_load(wxp)
  932.     struct waitc_spawn *wxp;
  933. {
  934.     int i;
  935.     struct task *tp;
  936.     int err = 0;
  937.  
  938.     for (i = 0; i < wxp->w_veclen; i++) {
  939.         if (err) {
  940.             wxp->w_vec[i] = err;
  941.  
  942.         } else {
  943.             if (err = forkexec(wxp->w_flags, wxp->w_argv[0], wxp->w_argv,
  944.                     wxp->w_nenv, wxp->w_env, 
  945.                     (wxp->w_instance+i), wxp->w_hosttotal, wxp->w_outof, &tp)) {
  946.                 wxp->w_vec[i] = err;
  947.  
  948.             } else {
  949.                 tp->t_ptid = wxp->w_ptid;
  950.                 tp->t_outtid = wxp->w_outtid;
  951.                 tp->t_outctx = wxp->w_outctx;
  952.                 tp->t_outtag = wxp->w_outtag;
  953.                 tp->t_trctid = wxp->w_trctid;
  954.                 tp->t_trcctx = wxp->w_trcctx;
  955.                 tp->t_trctag = wxp->w_trctag;
  956.                 tp->t_sched = wxp->w_sched;
  957.                 mpp_conn(tp);    /* XXX this can fail, dunno how to clean up */
  958.                 wxp->w_vec[i] = tp->t_tid;
  959.             }
  960.         }
  961.     }
  962.     return 0;
  963. }
  964.  
  965.  
  966. /* XXX this is just a basic copy of the pvmdunix code so we can at least compile */
  967. int
  968. ppi_kill(tp, signum)
  969.     struct task *tp;
  970.     int signum;
  971. {
  972.     if (tp->t_pid)
  973.         (void)kill(tp->t_pid, signum);
  974.     else
  975.         pvmlogprintf("ppi_kill() signal for t%x scrapped (pid = 0)\n",
  976.                 tp->t_tid);
  977.     return 0;
  978. }
  979.  
  980.  
  981. /* write any packets waiting the tp -> t_txq, send queue, these may 
  982. have been stacked up before the task state changed to  TF_SHMCONN
  983. This is currently called by tm_getopt when the state change is
  984. marked from TF_PRESHMCONN to TF_SHMCONN
  985. */
  986. int
  987. shm_wrt_pkts(tp)
  988. struct task * tp;
  989. {
  990. struct pkt *pp, *pp2, *pptmp;
  991. struct pkt tmplist;
  992.  
  993.     
  994.     if (tp -> t_txq)
  995.     {
  996.         pptmp = &tmplist;
  997.         pptmp -> pk_link = pptmp->pk_rlink = pptmp;
  998.  
  999.         /* Take packets off of the t_txq */
  1000.         pp = tp -> t_txq -> pk_link;
  1001.         while (pp != tp -> t_txq)
  1002.         {
  1003.             pp2 = pp;
  1004.             pp = pp -> pk_link;
  1005.             LISTDELETE(pp2, pk_link, pk_rlink);
  1006.             LISTPUTBEFORE(pptmp, pp2, pk_link, pk_rlink);
  1007.         }
  1008.  
  1009.         pp = pptmp -> pk_link;
  1010.  
  1011.         while (pp != pptmp)
  1012.         {
  1013.             pp2 = pp;
  1014.             pp = pp -> pk_link;
  1015.             LISTDELETE(pp2, pk_link, pk_rlink);
  1016.             if (tp -> t_flag & TF_SHMCONN)
  1017.             {
  1018.                 if ( pvmdebmask & PDMPACKET)
  1019.                     pvmlogprintf("shm_wrt_pkts for %x \n", tp -> t_tid);
  1020.                 mpp_output(tp, pp2);
  1021.             }
  1022.             else
  1023.                 break;
  1024.         }    
  1025.     }
  1026.         
  1027.     return 0;
  1028. }
  1029.     
  1030.